From: Jeroen van der Heijden Date: Mon, 29 Oct 2018 16:11:41 +0000 (+0100) Subject: ondata X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~7^2~2^2~22^2~6 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/success//%22http:/www.example.com/cgi/success/?a=commitdiff_plain;h=def33a1b807cc5bb3e22ae2f1b9995886197b03b;p=siridb-server.git ondata --- diff --git a/Debug/src/siri/db/subdir.mk b/Debug/src/siri/db/subdir.mk index 57a30dea..c294f8c2 100644 --- a/Debug/src/siri/db/subdir.mk +++ b/Debug/src/siri/db/subdir.mk @@ -38,6 +38,7 @@ C_SRCS += \ ../src/siri/db/shard.c \ ../src/siri/db/shards.c \ ../src/siri/db/tasks.c \ +../src/siri/db/tee.c \ ../src/siri/db/time.c \ ../src/siri/db/user.c \ ../src/siri/db/users.c \ @@ -79,6 +80,7 @@ OBJS += \ ./src/siri/db/shard.o \ ./src/siri/db/shards.o \ ./src/siri/db/tasks.o \ +./src/siri/db/tee.o \ ./src/siri/db/time.o \ ./src/siri/db/user.o \ ./src/siri/db/users.o \ @@ -120,6 +122,7 @@ C_DEPS += \ ./src/siri/db/shard.d \ ./src/siri/db/shards.d \ ./src/siri/db/tasks.d \ +./src/siri/db/tee.d \ ./src/siri/db/time.d \ ./src/siri/db/user.d \ ./src/siri/db/users.d \ diff --git a/Release/src/siri/db/subdir.mk b/Release/src/siri/db/subdir.mk index 96d8a768..154d0843 100644 --- a/Release/src/siri/db/subdir.mk +++ b/Release/src/siri/db/subdir.mk @@ -38,6 +38,7 @@ C_SRCS += \ ../src/siri/db/shard.c \ ../src/siri/db/shards.c \ ../src/siri/db/tasks.c \ +../src/siri/db/tee.c \ ../src/siri/db/time.c \ ../src/siri/db/user.c \ ../src/siri/db/users.c \ @@ -79,6 +80,7 @@ OBJS += \ ./src/siri/db/shard.o \ ./src/siri/db/shards.o \ ./src/siri/db/tasks.o \ +./src/siri/db/tee.o \ ./src/siri/db/time.o \ ./src/siri/db/user.o \ ./src/siri/db/users.o \ @@ -120,6 +122,7 @@ C_DEPS += \ ./src/siri/db/shard.d \ ./src/siri/db/shards.d \ ./src/siri/db/tasks.d \ +./src/siri/db/tee.d \ ./src/siri/db/time.d \ ./src/siri/db/user.d \ ./src/siri/db/users.d \ diff --git a/grammar/export_grammar.py b/grammar/export_grammar.py index 22e14bc0..b0b1d72d 100755 --- a/grammar/export_grammar.py +++ b/grammar/export_grammar.py @@ -4,8 +4,8 @@ Author: Jeroen van der Heijden (Transceptor Technology) Date: 2016-10-10 ''' -# import sys -# sys.path.insert(0, '../../pyleri/') +import sys +sys.path.insert(0, '../../pyleri/') import os from grammar import siri_grammar from pyleri import Grammar diff --git a/grammar/grammar.py b/grammar/grammar.py index 3f2affa7..5f6d96ff 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -159,6 +159,7 @@ class SiriGrammar(Grammar): Keyword('symmetric_difference'), most_greedy=False) k_sync_progress = Keyword('sync_progress') + k_tee_pipe_name = Keyword('tee_pipe_name') k_timeit = Keyword('timeit') k_timezone = Keyword('timezone') k_time_precision = Keyword('time_precision') @@ -277,6 +278,7 @@ class SiriGrammar(Grammar): k_reindex_progress, k_selected_points, k_sync_progress, + k_tee_pipe_name, k_uptime, most_greedy=False), ',', 1) @@ -370,6 +372,7 @@ class SiriGrammar(Grammar): k_status, k_reindex_progress, k_sync_progress, + k_tee_pipe_name, most_greedy=False), str_operator, string), Sequence(k_online, bool_operator, _boolean), Sequence(k_log_level, int_operator, log_keywords), @@ -562,6 +565,10 @@ class SiriGrammar(Grammar): Optional(Sequence(k_using, aggregate_functions))) set_address = Sequence(k_set, k_address, string) + set_tee_pipe_name = Sequence(k_set, k_tee_pipe_name, Choice( + k_false, + string, + most_greedy=False)) set_backup_mode = Sequence(k_set, k_backup_mode, _boolean) set_drop_threshold = Sequence(k_set, k_drop_threshold, r_float) set_expression = Sequence(k_set, k_expression, r_regex) @@ -590,11 +597,15 @@ class SiriGrammar(Grammar): alter_server = Sequence(k_server, uuid, Choice( set_log_level, set_backup_mode, + set_tee_pipe_name, set_address, set_port, most_greedy=False)) - alter_servers = Sequence(k_servers, Optional(where_server), set_log_level) + alter_servers = Sequence(k_servers, Optional(where_server), Choice( + set_log_level, + set_tee_pipe_name, + most_greedy=False)) alter_user = Sequence(k_user, string, Choice( set_password, @@ -764,6 +775,7 @@ class SiriGrammar(Grammar): k_startup_time, k_status, k_sync_progress, + k_tee_pipe_name, k_time_precision, k_timezone, k_uptime, diff --git a/include/siri/db/db.h b/include/siri/db/db.h index 1fcf2448..952f96d8 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -8,7 +8,7 @@ typedef struct siridb_s siridb_t; #define SIRIDB_MAX_SIZE_ERR_MSG 1024 #define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */ -#define SIRIDB_SCHEMA 4 +#define SIRIDB_SCHEMA 5 #define SIRIDB_FLAG_REINDEXING 1 #define DEF_DROP_THRESHOLD 1.0 /* 100% */ @@ -35,6 +35,8 @@ typedef struct siridb_s siridb_t; #include #include #include +#include + int32_t siridb_get_uptime(siridb_t * siridb); int8_t siridb_get_idle_percentage(siridb_t * siridb); @@ -91,6 +93,7 @@ struct siridb_s siridb_reindex_t * reindex; siridb_groups_t * groups; siridb_buffer_t * buffer; + siridb_tee_t * tee; siridb_tasks_t tasks; }; diff --git a/include/siri/db/tee.h b/include/siri/db/tee.h new file mode 100644 index 00000000..36739eed --- /dev/null +++ b/include/siri/db/tee.h @@ -0,0 +1,47 @@ +/* + * tee.h - To tee the data for a SiriDB database. + */ +#ifndef SIRIDB_TEE_H_ +#define SIRIDB_TEE_H_ + +typedef struct siridb_tee_s siridb_tee_t; + +enum +{ + SIRIDB_TEE_FLAG_INIT = 1<<0, + SIRIDB_TEE_FLAG_CONNECTED = 1<<1, + SIRIDB_TEE_FLAG = 1<<31, +}; + +#include +#include +#include + +siridb_tee_t * siridb_tee_new(void); +void siridb_tee_free(siridb_tee_t * tee); +int siridb_tee_connect(siridb_tee_t * tee); +int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name); +void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise); +const char * tee_str(siridb_tee_t * tee); +static inline _Bool siridb_tee_is_configured(siridb_tee_t * tee); +static inline _Bool siridb_tee_is_connected(siridb_tee_t * tee); + +struct siridb_tee_s +{ + uint32_t flags; /* maps to sirnet_stream_t tp for cleanup */ + char * pipe_name_; + char * err_msg_; + uv_pipe_t pipe; +}; + +static inline _Bool siridb_tee_is_configured(siridb_tee_t * tee) +{ + return tee->pipe_name_ != NULL; +}; + +static inline _Bool siridb_tee_is_connected(siridb_tee_t * tee) +{ + return tee->flags & SIRIDB_TEE_FLAG_CONNECTED; +} + +#endif /* SIRIDB_TEE_H_ */ diff --git a/include/siri/grammar/grammar.h b/include/siri/grammar/grammar.h index b7c9033f..45b8a8b2 100644 --- a/include/siri/grammar/grammar.h +++ b/include/siri/grammar/grammar.h @@ -5,17 +5,17 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2018-07-05 16:20:26 + * Created at: 2018-10-29 10:52:57 */ #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ #include -cleri_grammar_t * compile_grammar(void); +cleri_grammar_t * compile_siri_grammar_grammar(void); enum cleri_grammar_ids { - CLERI_NONE, /* used for objects with no name */ + CLERI_NONE, // used for objects with no name CLERI_GID_ACCESS_EXPR, CLERI_GID_ACCESS_KEYWORDS, CLERI_GID_AFTER_EXPR, @@ -224,6 +224,7 @@ enum cleri_grammar_ids { CLERI_GID_K_SUM, CLERI_GID_K_SYMMETRIC_DIFFERENCE, CLERI_GID_K_SYNC_PROGRESS, + CLERI_GID_K_TEE_PIPE_NAME, CLERI_GID_K_TIMEIT, CLERI_GID_K_TIMEZONE, CLERI_GID_K_TIME_PRECISION, @@ -288,6 +289,7 @@ enum cleri_grammar_ids { CLERI_GID_SET_PASSWORD, CLERI_GID_SET_PORT, CLERI_GID_SET_SELECT_POINTS_LIMIT, + CLERI_GID_SET_TEE_PIPE_NAME, CLERI_GID_SET_TIMEZONE, CLERI_GID_SHARD_COLUMNS, CLERI_GID_SHOW_STMT, @@ -306,7 +308,7 @@ enum cleri_grammar_ids { CLERI_GID_WHERE_SHARD, CLERI_GID_WHERE_USER, CLERI_GID__BOOLEAN, - CLERI_END /* can be used to get the enum length */ + CLERI_END // can be used to get the enum length }; #endif /* CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ */ diff --git a/include/siri/net/promise.h b/include/siri/net/promise.h index 61f7e057..85273e27 100644 --- a/include/siri/net/promise.h +++ b/include/siri/net/promise.h @@ -30,8 +30,8 @@ typedef void (* sirinet_promise_cb)( const char * sirinet_promise_strstatus(sirinet_promise_status_t status); -#define sirinet_promise_incref(promise) promise->ref++ -#define sirinet_promise_decref(promise) if (!--promise->ref) free(promise) +#define sirinet_promise_incref(p__) (p__)->ref++ +#define sirinet_promise_decref(p__) if (!--(p__)->ref) free(p__) /* the callback will always be called and is responsible to free the promise */ struct sirinet_promise_s diff --git a/include/siri/net/protocol.h b/include/siri/net/protocol.h index 682e5c30..455f6c98 100644 --- a/include/siri/net/protocol.h +++ b/include/siri/net/protocol.h @@ -77,6 +77,7 @@ typedef enum BPROTO_REQ_GROUPS, /* empty */ BPROTO_ENABLE_BACKUP_MODE, /* empty */ BPROTO_DISABLE_BACKUP_MODE, /* empty */ + BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */ } bproto_client_t; /* @@ -123,7 +124,8 @@ typedef enum BPROTO_ACK_DROP_SERIES, /* empty */ BPROTO_ACK_ENABLE_BACKUP_MODE, /* empty */ BPROTO_ACK_DISABLE_BACKUP_MODE, /* empty */ - BPROTO_RES_GROUPS /* [[name, series], ...] */ + BPROTO_RES_GROUPS, /* [[name, series], ...] */ + BPROTO_ACK_TEE_PIPE_NAME /* empty */ } bproto_server_t; diff --git a/include/siri/net/stream.h b/include/siri/net/stream.h index 48afb3d3..2e06b6e2 100644 --- a/include/siri/net/stream.h +++ b/include/siri/net/stream.h @@ -48,7 +48,7 @@ void sirinet__stream_free(uv_stream_t * uvclient); struct sirinet_stream_s { - sirinet_stream_tp_t tp; + uint32_t tp; /* maps to siridb_tee_t flags for cleanup */ uint32_t ref; on_data_cb_t on_data; siridb_t * siridb; diff --git a/siridb.conf b/siridb.conf index 689e11a4..21c6ab23 100644 --- a/siridb.conf +++ b/siridb.conf @@ -53,9 +53,9 @@ optimize_interval = 3600 heartbeat_interval = 30 # -# SiriDB can run fsync on the buffer file on an interval in milliseconds. -# This value is set to 0 by default which tells SiriDB to run fsync after -# each insert request. When having many insert requests per second, it can be +# SiriDB can run fsync on the buffer file on an interval in milliseconds. +# This value is set to 0 by default which tells SiriDB to run fsync after +# each insert request. When having many insert requests per second, it can be # useful to use an interval like 500 milliseconds. # #buffer_sync_interval = 500 @@ -83,3 +83,4 @@ enable_pipe_support = 0 # SiriDB will bind the client named pipe in this location. # pipe_client_name = siridb_client.sock + diff --git a/src/cexpr/cexpr.c b/src/cexpr/cexpr.c index 6bd01fd1..54ce516d 100644 --- a/src/cexpr/cexpr.c +++ b/src/cexpr/cexpr.c @@ -570,8 +570,7 @@ static cexpr_t * CEXPR_new(void) */ static cexpr_condition_t * CEXPR_condition_new(void) { - cexpr_condition_t * condition = - (cexpr_condition_t *) malloc(sizeof(cexpr_condition_t)); + cexpr_condition_t * condition = malloc(sizeof(cexpr_condition_t)); if (condition != NULL) { diff --git a/src/logger/logger.c b/src/logger/logger.c index 6cad9350..e1750ee2 100644 --- a/src/logger/logger.c +++ b/src/logger/logger.c @@ -8,7 +8,7 @@ #include logger_t Logger = { - .level=10, + .level=2, .level_name=NULL, .ostream=NULL, .flags=0 diff --git a/src/qpack/qpack.c b/src/qpack/qpack.c index 8bade381..032bbf25 100644 --- a/src/qpack/qpack.c +++ b/src/qpack/qpack.c @@ -169,14 +169,14 @@ qp_unpacker_t * qp_unpacker_ff(const char * fn) } else { - unpacker = (qp_unpacker_t *) malloc(sizeof(qp_unpacker_t)); + unpacker = malloc(sizeof(qp_unpacker_t)); if (unpacker == NULL) { ERR_ALLOC } else { - unpacker->source = (unsigned char *) malloc(size); + unpacker->source = malloc(size); if (unpacker->source == NULL) { ERR_ALLOC diff --git a/src/siri/db/db.c b/src/siri/db/db.c index 7c6f329c..cf765268 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -269,6 +269,12 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags) /* start tasks */ siridb_tasks_init(&siridb->tasks); + /* init tee if configured */ + if (siridb_tee_is_configured(siridb->tee)) + { + siridb_tee_connect(siridb->tee); + } + log_info("Finished loading database: '%s'", siridb->dbname); return siridb; @@ -302,7 +308,8 @@ static int siridb__from_unpacker( /* check schema */ if ( qp_schema.via.int64 == 1 || qp_schema.via.int64 == 2 || - qp_schema.via.int64 == 3) + qp_schema.via.int64 == 3 || + qp_schema.via.int64 == 4) { log_info( "Found an old database schema (v%d), " @@ -463,6 +470,37 @@ static int siridb__from_unpacker( (*siridb)->list_limit = qp_obj.via.int64; } + /* for older schemas we keep the default tee_pipe_name=NULL */ + if (qp_schema.via.int64 >= 5) + { + qp_next(unpacker, &qp_obj); + + if (qp_obj.tp == QP_RAW) + { + (*siridb)->tee->pipe_name_ = strndup( + (char *) qp_obj.via.raw, + qp_obj.len); + READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.") + } + else if (qp_obj.tp != QP_NULL) + { + READ_DB_EXIT_WITH_ERROR("Cannot read tee pipe name.") + } + } + if ((*siridb)->tee->pipe_name_ == NULL) + { + log_debug( + "No tee pipe name configured for database: %s", + (*siridb)->dbname); + } + else + { + log_debug( + "Using tee pipe name '%s' for database: '%s'", + (*siridb)->tee->pipe_name_, + (*siridb)->dbname); + } + return (qp_schema.via.int64 == SIRIDB_SCHEMA) ? 0 : qp_schema.via.int64; } @@ -553,6 +591,9 @@ int siridb_save(siridb_t * siridb) qp_fadd_double(fpacker, siridb->drop_threshold) || qp_fadd_int64(fpacker, siridb->select_points_limit) || qp_fadd_int64(fpacker, siridb->list_limit) || + (siridb->tee->pipe_name_ == NULL + ? qp_fadd_type(fpacker, QP_NULL) + : qp_fadd_string(fpacker, siridb->tee->pipe_name_)) || qp_fadd_type(fpacker, QP_ARRAY_CLOSE) || qp_close(fpacker)); } @@ -645,6 +686,11 @@ void siridb__free(siridb_t * siridb) siridb_groups_decref(siridb->groups); } + if (siridb->tee != NULL) + { + siridb_tee_free(siridb->tee); + } + /* unlock the database in case no siri_err occurred */ if (!siri_err) { @@ -673,89 +719,81 @@ static siridb_t * siridb__new(void) siridb_t * siridb = (siridb_t *) malloc(sizeof(siridb_t)); if (siridb == NULL) { - ERR_ALLOC + goto fail0; } - else + + siridb->dbname = NULL; + siridb->dbpath = NULL; + siridb->ref = 1; + siridb->insert_tasks = 0; + siridb->flags = 0; + siridb->time = NULL; + siridb->users = NULL; + siridb->servers = NULL; + siridb->pools = NULL; + siridb->max_series_id = 0; + siridb->received_points = 0; + siridb->selected_points = 0; + siridb->drop_threshold = DEF_DROP_THRESHOLD; + siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT; + siridb->list_limit = DEF_LIST_LIMIT; + siridb->tz = -1; + siridb->server = NULL; + siridb->replica = NULL; + siridb->fifo = NULL; + siridb->replicate = NULL; + siridb->reindex = NULL; + siridb->groups = NULL; + siridb->dropped_fp = NULL; + siridb->store = NULL; + + siridb->series = ct_new(); + if (siridb->series == NULL) { - siridb->series = ct_new(); - if (siridb->series == NULL) - { - ERR_ALLOC - free(siridb); - siridb = NULL; - } - else - { - siridb->series_map = imap_new(); - if (siridb->series_map == NULL) - { - ct_free(siridb->series, NULL); - free(siridb); - siridb = NULL; - ERR_ALLOC - } - else - { - siridb->shards = imap_new(); - if (siridb->shards == NULL) - { - imap_free(siridb->series_map, NULL); - ct_free(siridb->series, NULL); - free(siridb); - siridb = NULL; - ERR_ALLOC - - } - else - { - /* allocate a buffer */ - siridb->buffer = siridb_buffer_new(); - if (siridb->buffer == NULL) - { - imap_free(siridb->shards, NULL); - imap_free(siridb->series_map, NULL); - ct_free(siridb->series, NULL); - free(siridb); - siridb = NULL; - ERR_ALLOC - } - else - { - siridb->dbname = NULL; - siridb->dbpath = NULL; - siridb->ref = 1; - siridb->insert_tasks = 0; - siridb->flags = 0; - siridb->time = NULL; - siridb->users = NULL; - siridb->servers = NULL; - siridb->pools = NULL; - siridb->max_series_id = 0; - siridb->received_points = 0; - siridb->selected_points = 0; - siridb->drop_threshold = DEF_DROP_THRESHOLD; - siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT; - siridb->list_limit = DEF_LIST_LIMIT; - siridb->tz = -1; - siridb->server = NULL; - siridb->replica = NULL; - siridb->fifo = NULL; - siridb->replicate = NULL; - siridb->reindex = NULL; - siridb->groups = NULL; - - /* make file pointers are NULL when file is closed */ - siridb->dropped_fp = NULL; - siridb->store = NULL; - - uv_mutex_init(&siridb->series_mutex); - uv_mutex_init(&siridb->shards_mutex); - } - } - } - } + goto fail0; + } + + siridb->series_map = imap_new(); + if (siridb->series_map == NULL) + { + goto fail1; } + siridb->shards = imap_new(); + if (siridb->shards == NULL) + { + goto fail2; + } + /* allocate a buffer */ + siridb->buffer = siridb_buffer_new(); + if (siridb->buffer == NULL) + { + goto fail3; + } + + /* allocate tee */ + siridb->tee = siridb_tee_new(); + if (siridb->tee == NULL) + { + goto fail4; + } + + uv_mutex_init(&siridb->series_mutex); + uv_mutex_init(&siridb->shards_mutex); + return siridb; + +fail4: + siridb_buffer_free(siridb->buffer); +fail3: + imap_free(siridb->shards, NULL); +fail2: + imap_free(siridb->series_map, NULL); +fail1: + ct_free(siridb->series, NULL); +fail0: + free(siridb); + ERR_ALLOC + return NULL; } static siridb_t * siridb__from_dat(const char * dbpath) diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index 6c9c4551..920d0b39 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -284,16 +284,14 @@ int insert_init_backend_local( sirinet_pkg_t * pkg, uint8_t flags) { - sirinet_promise_t * promise = - (sirinet_promise_t *) malloc(sizeof(sirinet_promise_t)); + sirinet_promise_t * promise = malloc(sizeof(sirinet_promise_t)); if (promise == NULL) { ERR_ALLOC return -1; } - siridb_insert_local_t * ilocal = - (siridb_insert_local_t *) malloc(sizeof(siridb_insert_local_t)); + siridb_insert_local_t * ilocal = malloc(sizeof(siridb_insert_local_t)); if (ilocal == NULL) { free(promise); @@ -301,7 +299,7 @@ int insert_init_backend_local( return -1; } - uv_async_t * handle = (uv_async_t *) malloc(sizeof(uv_async_t)); + uv_async_t * handle = malloc(sizeof(uv_async_t)); if (handle == NULL) { free(promise); @@ -347,6 +345,11 @@ int insert_init_backend_local( siridb_tasks_inc(siridb->tasks); siridb->insert_tasks++; + if (siridb_tee_is_connected(siridb->tee)) + { + siridb_tee_write(siridb->tee, promise); + } + uv_async_init(siri.loop, handle, INSERT_local_task); uv_async_send(handle); diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 70ab542b..bc3ead7c 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -154,8 +154,12 @@ if (IS_MASTER && siridb_is_reindexing(siridb)) \ "Successfully dropped server '%s'." #define MSG_SUCCES_SET_LOG_LEVEL_MULTI \ "Successfully set log level to '%s' on %lu servers." +#define MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI \ + "Successfully set tee_pipe name on %lu servers." #define MSG_SUCCES_SET_LOG_LEVEL \ "Successfully set log level to '%s' on '%s'." +#define MSG_SUCCES_SET_TEE_PIPE_NAME \ + "Successfully set tee pipe name to '%s' on '%s'." #define MSG_SUCCESS_SET_SELECT_POINTS_LIMIT \ "Successfully changed select points limit from %" PRIu32 " to %" PRIu32 "." #define MSG_SUCCES_DROP_SERIES \ @@ -245,6 +249,7 @@ static void exit_set_list_limit(uv_async_t * handle); static void exit_set_log_level(uv_async_t * handle); static void exit_set_port(uv_async_t * handle); static void exit_set_select_points_limit(uv_async_t * handle); +static void exit_set_tee_pipe_name(uv_async_t * handle); static void exit_set_timezone(uv_async_t * handle); static void exit_show_stmt(uv_async_t * handle); static void exit_timeit_stmt(uv_async_t * handle); @@ -477,6 +482,7 @@ void siridb_init_listener(void) siridb_listen_exit[CLERI_GID_SET_LOG_LEVEL] = exit_set_log_level; siridb_listen_exit[CLERI_GID_SET_PORT] = exit_set_port; siridb_listen_exit[CLERI_GID_SET_SELECT_POINTS_LIMIT] = exit_set_select_points_limit; + siridb_listen_exit[CLERI_GID_SET_TEE_PIPE_NAME] = exit_set_tee_pipe_name; siridb_listen_exit[CLERI_GID_SET_TIMEZONE] = exit_set_timezone; siridb_listen_exit[CLERI_GID_SHOW_STMT] = exit_show_stmt; siridb_listen_exit[CLERI_GID_TIMEIT_STMT] = exit_timeit_stmt; @@ -4016,6 +4022,125 @@ static void exit_set_select_points_limit(uv_async_t * handle) } } +static void exit_set_tee_pipe_name(uv_async_t * handle) +{ + siridb_query_t * query = (siridb_query_t *) handle->data; + query_alter_t * q_alter = (query_alter_t *) query->data; + siridb_t * siridb = query->client->siridb; + + assert (query->data != NULL); + + cleri_node_t * node = + query->nodes->node->children->next->next->node->children->node; + + char pipe_name[node->len - 1]; + xstr_extract_string(pipe_name, node->str, node->len); + + if (q_alter->alter_tp == QUERY_ALTER_SERVERS) + { + /* + * alter_servers + */ + cexpr_t * where_expr = ((query_list_t *) query->data)->where_expr; + siridb_server_walker_t wserver = { + .server=siridb->server, + .siridb=siridb + }; + + if (where_expr == NULL || cexpr_run( + where_expr, + (cexpr_cb_t) siridb_server_cexpr_cb, + &wserver)) + { + siridb_tee_set_pipe_name(siridb->tee, pipe_name); + q_alter->n++; + } + + if (IS_MASTER) + { + /* + * This is a trick because we share with setting log level on + * multiple servers at once. + */ + q_alter->n += LOGGER_NUM_LEVELS << 16; + siridb_query_forward( + handle, + SIRIDB_QUERY_FWD_SERVERS, + (sirinet_promises_cb) on_alter_xxx_response, + 0); + } + else + { + qp_add_raw(query->packer, (const unsigned char *) "servers", 7); + qp_add_int64(query->packer, q_alter->n); + SIRIPARSER_ASYNC_NEXT_NODE + } + } + else + { + /* + * alter_server + * + * we can set the success message, we just ignore the message in case + * an error occurs. + */ + siridb_server_t * server = q_alter->via.server; + + QP_ADD_SUCCESS + qp_add_fmt_safe(query->packer, + MSG_SUCCES_SET_TEE_PIPE_NAME, + pipe_name, + server->name); + + if (server == siridb->server) + { + (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + + SIRIPARSER_ASYNC_NEXT_NODE + } + else + { + + if (siridb_server_is_online(server)) + { + sirinet_pkg_t * pkg = sirinet_pkg_new( + 0, + strlen(pipe_name), + BPROTO_TEE_PIPE_NAME_UPDATE, + (unsigned char *) pipe_name); + if (pkg != NULL) + { + /* handle will be bound to a timer so we should increment */ + siri_async_incref(handle); + if (siridb_server_send_pkg( + server, + pkg, + 0, + (sirinet_promise_cb) on_ack_response, + handle, + 0)) + { + /* + * signal is raised and 'on_ack_response' will not be + * called + */ + free(pkg); + siri_async_decref(&handle); + } + } + } + else + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Cannot set pipe name, '%s' is currently unavailable", + server->name); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + } + } + } +} + static void exit_set_timezone(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; @@ -4951,7 +5076,9 @@ static void on_ack_response( case BPROTO_ACK_DISABLE_BACKUP_MODE: /* success message is already set */ break; - + case BPROTO_ACK_TEE_PIPE_NAME: + /* success message is already set */ + break; default: status = PROMISE_PKG_TYPE_ERROR; break; @@ -5029,19 +5156,32 @@ static void on_alter_xxx_response(vec_t * promises, uv_async_t * handle) } /* * Note: since this function has the sole purpose for alter servers - * and setting log levels, we now simply ad the message here. + * and setting log levels or pipe name, we now simply add the + * message here. */ QP_ADD_SUCCESS - log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI, - logger_level_name(q_alter->n >> 16), - q_alter->n & 0xffff); + if ((q_alter->n >> 16) >= LOGGER_NUM_LEVELS) + { + log_info(MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, q_alter->n & 0xffff); + qp_add_fmt_safe( + query->packer, + MSG_SUCCES_SET_TEE_PIPE_NAME_MULTI, + q_alter->n & 0xffff); + } + else + { + log_info(MSG_SUCCES_SET_LOG_LEVEL_MULTI, + logger_level_name(q_alter->n >> 16), + q_alter->n & 0xffff); + + qp_add_fmt_safe( + query->packer, + MSG_SUCCES_SET_LOG_LEVEL_MULTI, + logger_level_name(q_alter->n >> 16), + q_alter->n & 0xffff); + } - qp_add_fmt_safe( - query->packer, - MSG_SUCCES_SET_LOG_LEVEL_MULTI, - logger_level_name(q_alter->n >> 16), - q_alter->n & 0xffff); SIRIPARSER_ASYNC_NEXT_NODE } diff --git a/src/siri/db/props.c b/src/siri/db/props.c index 596a3e13..5ac92a5c 100644 --- a/src/siri/db/props.c +++ b/src/siri/db/props.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -139,6 +140,10 @@ static void prop_sync_progress( siridb_t * siridb, qp_packer_t * packer, int map); +static void prop_tee_pipe_name( + siridb_t * siridb, + qp_packer_t * packer, + int map); static void prop_timezone( siridb_t * siridb, qp_packer_t * packer, @@ -231,6 +236,8 @@ void siridb_init_props(void) prop_status; siridb_props[CLERI_GID_K_SYNC_PROGRESS - KW_OFFSET] = prop_sync_progress; + siridb_props[CLERI_GID_K_TEE_PIPE_NAME - KW_OFFSET] = + prop_tee_pipe_name; siridb_props[CLERI_GID_K_TIMEZONE - KW_OFFSET] = prop_timezone; siridb_props[CLERI_GID_K_TIME_PRECISION - KW_OFFSET] = @@ -499,6 +506,15 @@ static void prop_sync_progress( qp_add_string(packer, siridb_initsync_sync_progress(siridb)); } +static void prop_tee_pipe_name( + siridb_t * siridb, + qp_packer_t * packer, + int map) +{ + SIRIDB_PROP_MAP("tee_pipe_name", 13) + qp_add_string(packer, tee_str(siridb->tee)); +} + static void prop_timezone( siridb_t * siridb, qp_packer_t * packer, diff --git a/src/siri/db/server.c b/src/siri/db/server.c index 8272072d..800f7434 100644 --- a/src/siri/db/server.c +++ b/src/siri/db/server.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -479,8 +480,7 @@ static int SERVER_resolve_dns( hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_NUMERICSERV; - uv_getaddrinfo_t * resolver = - (uv_getaddrinfo_t *) malloc(sizeof(uv_getaddrinfo_t)); + uv_getaddrinfo_t * resolver = malloc(sizeof(uv_getaddrinfo_t)); if (resolver == NULL) { @@ -519,7 +519,7 @@ static void SERVER_on_resolved( int status, struct addrinfo * res) { - siridb_server_t * server = (siridb_server_t *) resolver->data; + siridb_server_t * server = resolver->data; if (status < 0) { @@ -1173,6 +1173,12 @@ int siridb_server_cexpr_cb( cond->operator, siridb_initsync_sync_progress(wserver->siridb), cond->str); + + case CLERI_GID_K_TEE_PIPE_NAME: + return cexpr_str_cmp( + cond->operator, + tee_str(wserver->siridb->tee), + cond->str); } log_critical("Unexpected server property received: %d", cond->prop); diff --git a/src/siri/db/servers.c b/src/siri/db/servers.c index 065d6dc7..56d26a8f 100644 --- a/src/siri/db/servers.c +++ b/src/siri/db/servers.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -697,6 +698,9 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle) case CLERI_GID_K_SYNC_PROGRESS: qp_add_string(query->packer, siridb_initsync_sync_progress(siridb)); break; + case CLERI_GID_K_TEE_PIPE_NAME: + qp_add_string(query->packer, tee_str(siridb->tee)); + break; case CLERI_GID_K_UPTIME: qp_add_int64( query->packer, diff --git a/src/siri/db/tee.c b/src/siri/db/tee.c new file mode 100644 index 00000000..53070f2d --- /dev/null +++ b/src/siri/db/tee.c @@ -0,0 +1,223 @@ +/* + * tee.c - To tee the data for a SiriDB database. + */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include +#include +#include +#include +#include + +#define TEE__BUF_SZ 512 +static char tee__buf[TEE__BUF_SZ]; + +static void tee__runtime_init(uv_pipe_t * pipe); +static void tee__write_cb(uv_write_t * req, int status); +static void tee__on_connect(uv_connect_t * req, int status); +static void tee__alloc_buffer( + uv_handle_t * handle, + size_t suggsz, + uv_buf_t * buf); +static void tee__on_data( + uv_stream_t * client, + ssize_t nread, + const uv_buf_t * buf); + +siridb_tee_t * siridb_tee_new(void) +{ + siridb_tee_t * tee = malloc(sizeof(siridb_tee_t)); + if (tee == NULL) + { + return NULL; + } + tee->pipe_name_ = NULL; + tee->err_msg_ = NULL; + tee->pipe.data = tee; + tee->flags = SIRIDB_TEE_FLAG; + return tee; +} + +void siridb_tee_free(siridb_tee_t * tee) +{ + free(tee->err_msg_); + free(tee->pipe_name_); + free(tee); +} + +int siridb_tee_connect(siridb_tee_t * tee) +{ + uv_connect_t * req = malloc(sizeof(uv_connect_t)); + if (req == NULL) + { + return -1; + } + + req->data = tee; + + if (uv_pipe_init(siri.loop, &tee->pipe, 0)) + { + return -1; + } + tee->flags |= SIRIDB_TEE_FLAG_INIT; + tee->pipe.data = tee; + + free(tee->err_msg_); + + uv_pipe_connect(req, &tee->pipe, tee->pipe_name_, tee__on_connect); + return 0; +} + +int siridb_tee_set_pipe_name(siridb_tee_t * tee, const char * pipe_name) +{ + free(tee->pipe_name_); + tee->pipe_name_ = strdup(pipe_name); + if (!tee->pipe_name_) + { + return -1; + } + if (tee->flags & SIRIDB_TEE_FLAG_INIT) + { + uv_close((uv_handle_t *) &tee->pipe, (uv_close_cb) tee__runtime_init); + } + else + { + tee__runtime_init(&tee->pipe); + } + return 0; +} + +void siridb_tee_write(siridb_tee_t * tee, sirinet_promise_t * promise) +{ + uv_write_t * req = malloc(sizeof(uv_write_t)); + if (!req) + { + log_error("Cannot allocate memory for tee request"); + return; + } + + req->data = promise; + sirinet_promise_incref(promise); + + uv_buf_t wrbuf = uv_buf_init( + (char *) promise->pkg, + sizeof(sirinet_pkg_t) + promise->pkg->len); + + if (uv_write(req, (uv_stream_t *) &tee->pipe, &wrbuf, 1, tee__write_cb)) + { + log_error("Cannot write to tee"); + sirinet_promise_decref(promise); + } +} + +const char * tee_str(siridb_tee_t * tee) +{ + if (tee->err_msg_) + { + return tee->err_msg_; + } + if (tee->pipe_name_) + { + return tee->pipe_name_; + } + return "disabled"; +} + + +static void tee__runtime_init(uv_pipe_t * pipe) +{ + siridb_tee_t * tee = pipe->data; + + tee->flags &= ~SIRIDB_TEE_FLAG_INIT; + tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; + + if (siridb_tee_connect(tee)) + { + log_error("Could not connect to tee at runtime"); + } +} + +static void tee__write_cb(uv_write_t * req, int status) +{ + sirinet_promise_t * promise = req->data; + sirinet_promise_decref(promise); + if (status) + { + log_error("Socket (tee) write error: %s", uv_strerror(status)); + } + free(req); +} + +static void tee__on_connect(uv_connect_t * req, int status) +{ + siridb_tee_t * tee = req->data; + + if (status == 0) + { + log_info("Connection created to pipe: '%s'", tee->pipe_name_); + if (uv_read_start(req->handle, tee__alloc_buffer, tee__on_data)) + { + if (asprintf(&tee->err_msg_, + "Cannot open pipe '%s' for reading", + tee->pipe_name_) >= 0) + { + log_error(tee->err_msg_); + } + } + else + { + tee->flags |= SIRIDB_TEE_FLAG_CONNECTED; + } + } + else + { + if (asprintf( + &tee->err_msg_, + "Cannot connect to pipe '%s' (%s)", + tee->pipe_name_, + uv_strerror(status)) >= 0) + { + log_error(tee->err_msg_); + } + } + free(req); +} + +static void tee__alloc_buffer( + uv_handle_t * handle __attribute__((unused)), + size_t suggsz __attribute__((unused)), + uv_buf_t * buf) +{ + buf->base = tee__buf; + buf->len = TEE__BUF_SZ; +} + + + +static void tee__on_data( + uv_stream_t * client, + ssize_t nread, + const uv_buf_t * buf __attribute__((unused))) +{ + siridb_tee_t * tee = client->data; + + if (nread < 0) + { + if (nread != UV_EOF) + { + log_error("Read error on pipe '%s' : '%s'", + sirinet_pipe_name((uv_pipe_t * ) client), + uv_err_name(nread)); + } + log_info("Disconnected from tee pipe: '%s'", + sirinet_pipe_name((uv_pipe_t * ) client)); + tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; + uv_close((uv_handle_t *) client, NULL); + } + + if (nread > 0) + { + log_debug("Got %zd bytes on tee which will be ignored", nread); + } +} diff --git a/src/siri/grammar/grammar.c b/src/siri/grammar/grammar.c index cdc883eb..0f14df21 100644 --- a/src/siri/grammar/grammar.c +++ b/src/siri/grammar/grammar.c @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2018-07-05 16:20:26 + * Created at: 2018-10-29 10:52:57 */ #include "siri/grammar/grammar.h" @@ -17,7 +17,7 @@ #define CLERI_FIRST_MATCH 0 #define CLERI_MOST_GREEDY 1 -cleri_grammar_t * compile_grammar(void) +cleri_grammar_t * compile_siri_grammar_grammar(void) { cleri_t * r_float = cleri_regex(CLERI_GID_R_FLOAT, "^[-+]?[0-9]*\\.?[0-9]+"); cleri_t * r_integer = cleri_regex(CLERI_GID_R_INTEGER, "^[-+]?[0-9]+"); @@ -160,6 +160,7 @@ cleri_grammar_t * compile_grammar(void) cleri_keyword(CLERI_NONE, "symmetric_difference", CLERI_CASE_SENSITIVE) ); cleri_t * k_sync_progress = cleri_keyword(CLERI_GID_K_SYNC_PROGRESS, "sync_progress", CLERI_CASE_SENSITIVE); + cleri_t * k_tee_pipe_name = cleri_keyword(CLERI_GID_K_TEE_PIPE_NAME, "tee_pipe_name", CLERI_CASE_SENSITIVE); cleri_t * k_timeit = cleri_keyword(CLERI_GID_K_TIMEIT, "timeit", CLERI_CASE_SENSITIVE); cleri_t * k_timezone = cleri_keyword(CLERI_GID_K_TIMEZONE, "timezone", CLERI_CASE_SENSITIVE); cleri_t * k_time_precision = cleri_keyword(CLERI_GID_K_TIME_PRECISION, "time_precision", CLERI_CASE_SENSITIVE); @@ -302,7 +303,7 @@ cleri_grammar_t * compile_grammar(void) cleri_t * server_columns = cleri_list(CLERI_GID_SERVER_COLUMNS, cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 28, + 29, k_address, k_buffer_path, k_buffer_size, @@ -330,6 +331,7 @@ cleri_grammar_t * compile_grammar(void) k_reindex_progress, k_selected_points, k_sync_progress, + k_tee_pipe_name, k_uptime ), cleri_token(CLERI_NONE, ","), 1, 0, 0); cleri_t * group_columns = cleri_list(CLERI_GID_GROUP_COLUMNS, cleri_choice( @@ -562,7 +564,7 @@ cleri_grammar_t * compile_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 11, + 12, k_address, k_buffer_path, k_dbpath, @@ -573,7 +575,8 @@ cleri_grammar_t * compile_grammar(void) k_version, k_status, k_reindex_progress, - k_sync_progress + k_sync_progress, + k_tee_pipe_name ), str_operator, string @@ -1044,6 +1047,19 @@ cleri_grammar_t * compile_grammar(void) k_address, string ); + cleri_t * set_tee_pipe_name = cleri_sequence( + CLERI_GID_SET_TEE_PIPE_NAME, + 3, + k_set, + k_tee_pipe_name, + cleri_choice( + CLERI_NONE, + CLERI_FIRST_MATCH, + 2, + k_false, + string + ) + ); cleri_t * set_backup_mode = cleri_sequence( CLERI_GID_SET_BACKUP_MODE, 3, @@ -1156,9 +1172,10 @@ cleri_grammar_t * compile_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 4, + 5, set_log_level, set_backup_mode, + set_tee_pipe_name, set_address, set_port ) @@ -1168,7 +1185,13 @@ cleri_grammar_t * compile_grammar(void) 3, k_servers, cleri_optional(CLERI_NONE, where_server), - set_log_level + cleri_choice( + CLERI_NONE, + CLERI_FIRST_MATCH, + 2, + set_log_level, + set_tee_pipe_name + ) ); cleri_t * alter_user = cleri_sequence( CLERI_GID_ALTER_USER, @@ -1484,7 +1507,7 @@ cleri_grammar_t * compile_grammar(void) cleri_list(CLERI_NONE, cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 34, + 35, k_active_handles, k_active_tasks, k_buffer_path, @@ -1513,6 +1536,7 @@ cleri_grammar_t * compile_grammar(void) k_startup_time, k_status, k_sync_progress, + k_tee_pipe_name, k_time_precision, k_timezone, k_uptime, diff --git a/src/siri/heartbeat.c b/src/siri/heartbeat.c index 7c659f8b..f4ff0dce 100644 --- a/src/siri/heartbeat.c +++ b/src/siri/heartbeat.c @@ -59,6 +59,12 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused))) { siridb = (siridb_t *) siridb_node->data; + if ( siridb_tee_is_configured(siridb->tee) && + !siridb_tee_is_connected(siridb->tee)) + { + siridb_tee_connect(siridb->tee); + } + server_node = siridb->servers->first; while (server_node != NULL) { diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index f2a36b86..816687ff 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -43,6 +43,9 @@ static void on_flags_update(sirinet_stream_t * client, sirinet_pkg_t * pkg); static void on_log_level_update( sirinet_stream_t * client, sirinet_pkg_t * pkg); +static void on_tee_pipe_name_update( + sirinet_stream_t * client, + sirinet_pkg_t * pkg); static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg); static void on_query( sirinet_stream_t * client, @@ -273,6 +276,9 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg) case BPROTO_DISABLE_BACKUP_MODE: on_disable_backup_mode(client, pkg); break; + case BPROTO_TEE_PIPE_NAME_UPDATE: + on_tee_pipe_name_update(client, pkg); + break; } } @@ -428,6 +434,27 @@ static void on_log_level_update(sirinet_stream_t * client, sirinet_pkg_t * pkg) } } +static void on_tee_pipe_name_update( + sirinet_stream_t * client, + sirinet_pkg_t * pkg) +{ + SERVER_CHECK_AUTHENTICATED(client, server); + siridb_t * siridb = client->siridb; + sirinet_pkg_t * package; + char * pipe_name = strndup((const char *) pkg->data, pkg->len); + if (pipe_name != NULL) + { + (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + } + + package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_TEE_PIPE_NAME, NULL); + if (package != NULL) + { + /* ignore result code, signal can be raised */ + sirinet_pkg_send(client, package); + } +} + static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg) { SERVER_CHECK_AUTHENTICATED(client, server) diff --git a/src/siri/net/pipe.c b/src/siri/net/pipe.c index 8ca2d405..55b028b8 100644 --- a/src/siri/net/pipe.c +++ b/src/siri/net/pipe.c @@ -32,7 +32,7 @@ char * sirinet_pipe_name(uv_pipe_t * client) } /* - * Cleanup socket (pipe) file. (Unused) + * Cleanup socket (pipe) file. (UNUSED) */ void sirinet_pipe_unlink(uv_pipe_t * client) { diff --git a/src/siri/net/pkg.c b/src/siri/net/pkg.c index 883c3c9d..c94ee780 100644 --- a/src/siri/net/pkg.c +++ b/src/siri/net/pkg.c @@ -135,7 +135,7 @@ sirinet_pkg_t * sirinet_pkg_err( */ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg) { - uv_write_t * req = (uv_write_t *) malloc(sizeof(uv_write_t)); + uv_write_t * req = malloc(sizeof(uv_write_t)); if (req == NULL) { @@ -144,7 +144,7 @@ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg) return -1; } - pkg_send_t * data = (pkg_send_t *) malloc(sizeof(pkg_send_t)); + pkg_send_t * data = malloc(sizeof(pkg_send_t)); if (data == NULL) { @@ -168,7 +168,13 @@ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg) (char *) pkg, sizeof(sirinet_pkg_t) + pkg->len); - uv_write(req, client->stream, &wrbuf, 1, PKG_write_cb); + if (uv_write(req, client->stream, &wrbuf, 1, PKG_write_cb)) + { + sirinet_stream_decref(data->client); + free(pkg); + free(req); + return -1; + } return 0; } @@ -180,7 +186,7 @@ int sirinet_pkg_send(sirinet_stream_t * client, sirinet_pkg_t * pkg) sirinet_pkg_t * sirinet_pkg_dup(sirinet_pkg_t * pkg) { size_t size = sizeof(sirinet_pkg_t) + pkg->len; - sirinet_pkg_t * dup = (sirinet_pkg_t *) malloc(size); + sirinet_pkg_t * dup = malloc(size); if (dup == NULL) { ERR_ALLOC diff --git a/src/siri/net/protocol.c b/src/siri/net/protocol.c index 856948ce..b9b6122b 100644 --- a/src/siri/net/protocol.c +++ b/src/siri/net/protocol.c @@ -84,6 +84,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n) case BPROTO_REQ_GROUPS: return "BPROTO_REQ_GROUPS"; case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE"; case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE"; + case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE"; default: sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n); return protocol_str; @@ -119,6 +120,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n) case BPROTO_ACK_ENABLE_BACKUP_MODE: return "BPROTO_ACK_ENABLE_BACKUP_MODE"; case BPROTO_ACK_DISABLE_BACKUP_MODE: return "BPROTO_ACK_DISABLE_BACKUP_MODE"; case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS"; + case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME"; default: sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n); return protocol_str; diff --git a/src/siri/net/stream.c b/src/siri/net/stream.c index 30f1d293..2c522775 100644 --- a/src/siri/net/stream.c +++ b/src/siri/net/stream.c @@ -85,7 +85,7 @@ sirinet_stream_t * sirinet_stream_new(sirinet_stream_tp_t tp, on_data_cb_t cb) */ char * sirinet_stream_name(sirinet_stream_t * client) { - switch (client->tp) + switch ((sirinet_stream_tp_t) client->tp) { case STREAM_TCP_CLIENT: case STREAM_TCP_BACKEND: @@ -242,7 +242,7 @@ void sirinet__stream_free(uv_stream_t * uvclient) { sirinet_stream_t * client = uvclient->data; - switch (client->tp) + switch ((sirinet_stream_tp_t) client->tp) { case STREAM_PIPE_CLIENT: case STREAM_TCP_CLIENT: /* listens to client connections */ diff --git a/src/siri/siri.c b/src/siri/siri.c index 11e42256..4cdd1774 100644 --- a/src/siri/siri.c +++ b/src/siri/siri.c @@ -142,7 +142,7 @@ int siri_start(void) siridb_init_aggregates(); /* load SiriDB grammar */ - siri.grammar = compile_grammar(); + siri.grammar = compile_siri_grammar_grammar(); /* create store for SiriDB instances */ siri.siridb_list = llist_new(); @@ -495,13 +495,16 @@ static void SIRI_walk_close_handlers( case UV_TCP: case UV_NAMED_PIPE: - if (handle->data == NULL) { - uv_close(handle, NULL); - } - else - { - sirinet_stream_decref((sirinet_stream_t *) handle->data); + sirinet_stream_t * stream = handle->data; + if (stream == NULL || (stream->tp & SIRIDB_TEE_FLAG)) + { + uv_close(handle, NULL); + } + else + { + sirinet_stream_decref(stream); + } } break;